package cn.doitedu.rtmk.engine;

import cn.doitedu.common.pojo.EventBean;
import cn.doitedu.rtmk.functions.RuleProcessFunction;
import cn.doitedu.rtmk.pojo.RuleMetaBean;
import cn.doitedu.rtmk.utils.DataStreamUtil;
import cn.doitedu.rtmk.utils.StateDescriptors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Entrypoint {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setForceUnalignedCheckpoints(true);


        env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint");


        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 获取 用户行为事件 流
        DataStream<EventBean> events = DataStreamUtil.getUserEventStream(env);

        // 将数据按用户进行分发处理
        KeyedStream<EventBean, Integer> keyedEventsStream = events.keyBy(EventBean::getUserId);

        // 获取 规则资源流
        DataStream<RuleMetaBean> ruleMetaDataStream = DataStreamUtil.getRuleMetaDataStream(tEnv);

        // 将规则资源流广播出去
        BroadcastStream<RuleMetaBean> ruleBroadcast = ruleMetaDataStream.broadcast(StateDescriptors.ruleMetaBroadCastStateDesc);

        // 将用户行为事件流  connect  规则资源广播流
        keyedEventsStream
                .connect(ruleBroadcast)
                .process(new RuleProcessFunction())
                .print();


        env.execute();
    }
}
